
-- This file includes all the python script that help other SQL file to
-- finish the correctness test, such as check the CPU usage, the bitmap of
-- CPUSET, and the Cgroup file is exist or not.
--
-- In Cgroup v1 (Alpha), we will check the directory of
--  /sys/fs/cgroup/cpu/gpdb
--  /sys/fs/cgroup/cpuacct/gpdb
-- /sys/fs/cgroup/cpuset/gpdb
--
-- In Cgroup v2 (Beta), we will check the directory of
-- /sys/fs/cgroup/gpdb/*
--
-- When we run different tests, we should include different auxiliary tool files
-- to schedule file.

-- start_ignore
CREATE LANGUAGE plpython3u;
-- end_ignore

-- enable resource group and restart cluster.
-- start_ignore
! gpconfig -c gp_resource_manager -v group;
! gpconfig -c max_connections -v 250 -m 25;
! gpconfig -c runaway_detector_activation_percent -v 100;
! gpstop -rai;
-- end_ignore

-- after the restart we need a new connection to run the queries

0: SHOW gp_resource_manager;

-- resource queue statistics should not crash
0: SELECT * FROM pg_resqueue_status;
0: SELECT * FROM gp_toolkit.gp_resqueue_status;
0: SELECT * FROM gp_toolkit.gp_resq_priority_backend;

-- verify the default settings
0: SELECT * from gp_toolkit.gp_resgroup_config;


0: CREATE OR REPLACE FUNCTION check_cgroup_configuration() RETURNS BOOL AS $$
    import os

    root = '/sys/fs/cgroup/'

    def get_cgroup_prop(prop):
        fullpath = os.path.join(root, prop)
        return int(open(fullpath).readline())

    def show_guc(guc):
        return plpy.execute('SHOW {}'.format(guc))[0][guc]

    # get top-level cgroup props
    cfs_quota_us = get_cgroup_prop('cpu/gpdb/cpu.cfs_quota_us')
    cfs_period_us = get_cgroup_prop('cpu/gpdb/cpu.cfs_period_us')
    shares = get_cgroup_prop('cpu/gpdb/cpu.shares')

    # get system props
    ncores = os.cpu_count()

    # get global gucs
    gp_resource_group_cpu_limit = float(show_guc('gp_resource_group_cpu_limit'))
    gp_resource_group_cpu_priority = int(show_guc('gp_resource_group_cpu_priority'))

    # cfs_quota_us := cfs_period_us * ncores * gp_resource_group_cpu_limit
    assert cfs_quota_us == cfs_period_us * ncores * gp_resource_group_cpu_limit

    # shares := 1024 * gp_resource_group_cpu_priority
    assert shares == 1024 * gp_resource_group_cpu_priority

    def check_group_shares(name):
        cpu_weight = int(plpy.execute('''
                SELECT value
                  FROM pg_resgroupcapability c, pg_resgroup g
                 WHERE c.resgroupid=g.oid
                   AND reslimittype=3
                   AND g.rsgname='{}'
            '''.format(name))[0]['value'])
        oid = int(plpy.execute('''
                SELECT oid FROM pg_resgroup WHERE rsgname='{}'
            '''.format(name))[0]['oid'])
        sub_shares = get_cgroup_prop('cpu/gpdb/{}/cpu.shares'.format(oid))
        assert sub_shares == int(cpu_weight * 1024 / 100)

    # check default groups
    check_group_shares('default_group')
    check_group_shares('admin_group')
    check_group_shares('system_group')

    # check user groups
    check_group_shares('rg1_cpu_test')
    check_group_shares('rg2_cpu_test')

    return True
$$ LANGUAGE plpython3u;


-- check whether the queries running on the specific core set
-- @param grp: the resource group name queries running in
-- @param cpuset: cpu cores which the queries should only be run on them, e.g. 0,1
-- @return bool: true/false indicating whether it corresponds to the rule
0: CREATE  OR REPLACE FUNCTION check_cpuset(grp TEXT, cpuset TEXT) RETURNS BOOL AS $$
    import subprocess
    import time
    import re

    pt = re.compile(r'con(\d+)')

    def check(expect_cpus, sess_ids):
        # use ps -eF to find all processes which belongs to postgres and in the given sessions
        procs = subprocess.check_output(['ps', '-eF']).decode().split('\n')
        head, proc_stats = procs[0], procs[1:]
        PSR = [id for id, attr in enumerate(head.split()) if attr.strip() == 'PSR'][0]
        cpus = [proc_stat.split()[PSR].strip() for proc_stat in proc_stats if 'postgres' in proc_stat and
                pt.findall(proc_stat) and sess_ids.issubset(set(pt.findall(proc_stat)))]
        return set(cpus).issubset(set(expect_cpus))

    def get_all_sess_ids_in_group(group_name):
        sql = "select sess_id from pg_stat_activity where rsgname = '%s'" % group_name
        result = plpy.execute(sql)
        return set([str(r['sess_id']) for r in result])

    conf = cpuset
    if conf == '':
        fd = open("/sys/fs/cgroup/cpuset/gpdb/cpuset.cpus")
        line = fd.readline()
        fd.close()
        conf = line.strip('\n')

    tokens = conf.split(",")

    expect_cpu = []

    for token in tokens:
        if token.find('-') != -1:
            interval = token.split("-")
            num1 = interval[0]
            num2 = interval[1]
            for num in range(int(num1), int(num2) + 1):
                expect_cpu.append(str(num))
        else:
            expect_cpu.append(token)
    sess_ids = get_all_sess_ids_in_group(grp)

    for i in range(1000):
        time.sleep(0.01)
        if not check(expect_cpu, sess_ids):
            return False

    return True
$$ LANGUAGE plpython3u;

-- create a resource group that contains all the cpu cores
0: CREATE OR REPLACE FUNCTION create_allcores_group(grp TEXT) RETURNS BOOL AS $$
    import subprocess

    file = "/sys/fs/cgroup/cpuset/gpdb/cpuset.cpus"
    fd = open(file)
    line = fd.readline()
    fd.close()
    line = line.strip('\n')
    sql = "create resource group " + grp + " with (" + "cpuset='" + line + "')"

    # plpy SPI will always start a transaction, but res group cannot be created in a transaction.
    ret = subprocess.run(['psql', 'postgres', '-c' , '{}'.format(sql)], capture_output=True)
    if ret.returncode != 0:
        plpy.error('failed to create resource group.\n {} \n {}'.format(ret.stdout, ret.stderr))

    file = "/sys/fs/cgroup/cpuset/gpdb/1/cpuset.cpus"
    fd = open(file)
    line = fd.readline()
    fd.close()
    line = line.strip('\n')
    if line != "0":
        return False

    return True
$$ LANGUAGE plpython3u;

-- check whether the cpuset value in cgroup is valid according to the rule
0: CREATE OR REPLACE FUNCTION check_cpuset_rules() RETURNS BOOL AS $$
    def get_all_group_which_cpuset_is_set():
        sql = "select groupid,cpuset from gp_toolkit.gp_resgroup_config where cpuset != '-1'"
        result = plpy.execute(sql)
        return result

    def parse_cpuset(line):
        line = line.strip('\n')
        if len(line) == 0:
            return set([])
        tokens = line.split(",")
        cpuset = []
        for token in tokens:
            if token.find('-') != -1:
                interval = token.split("-")
                num1 = interval[0]
                num2 = interval[1]
                for num in range(int(num1), int(num2) + 1):
                    cpuset.append(str(num))
            else:
                cpuset.append(token)
        return set(cpuset)

    def get_cgroup_cpuset(group):
        group = str(group)
        if group == '0':
            file = "/sys/fs/cgroup/cpuset/gpdb/cpuset.cpus"
        else:
            file = "/sys/fs/cgroup/cpuset/gpdb/" + group + "/cpuset.cpus"
        fd = open(file)
        line = fd.readline()
        fd.close()
        return parse_cpuset(line)

    config_groups = get_all_group_which_cpuset_is_set()
    groups_cpuset = set([])

    # check whether cpuset in config and cgroup are same, and have no overlap
    for config_group in config_groups:
        groupid = config_group['groupid']
        cpuset_value = config_group['cpuset']
        config_cpuset = parse_cpuset(cpuset_value)
        cgroup_cpuset = get_cgroup_cpuset(groupid)
        if len(groups_cpuset & cgroup_cpuset) > 0:
            return False
        groups_cpuset |= cgroup_cpuset

        if not (config_cpuset.issubset(cgroup_cpuset) and cgroup_cpuset.issubset(config_cpuset)):
            return False

    # check whether cpuset in resource group union default group is universal set
    default_cpuset = get_cgroup_cpuset(1)
    all_cpuset = get_cgroup_cpuset(0)
    if not (default_cpuset | groups_cpuset).issubset(all_cpuset):
        return False
    if not all_cpuset.issubset(default_cpuset | groups_cpuset):
        return False
    # if all the cores are allocated to resource group, default group must has a core left
    if len(default_cpuset & groups_cpuset) > 0 and (len(default_cpuset) != 1 or (not default_cpuset.issubset(all_cpuset))):
        return False

    return True
$$ LANGUAGE plpython3u;


0: CREATE OR REPLACE FUNCTION is_session_in_group(pid integer, groupname text) RETURNS BOOL AS $$
    import subprocess

    sql = "select sess_id from pg_stat_activity where pid = '%d'" % pid
    result = plpy.execute(sql)
    session_id = result[0]['sess_id']

    sql = "select groupid from gp_toolkit.gp_resgroup_config where groupname='%s'" % groupname
    result = plpy.execute(sql)
    groupid = result[0]['groupid']

    sql = "select hostname from gp_segment_configuration group by hostname"
    result = plpy.execute(sql)
    hosts = [_['hostname'] for _ in result]

    def get_result(host):
        stdout = subprocess.run(["ssh", "{}".format(host), "ps -ef | grep postgres | grep con{} | grep -v grep | awk '{{print $2}}'".format(session_id)],
                                capture_output=True, check=True).stdout
        session_pids = stdout.splitlines()

        path = "/sys/fs/cgroup/cpu/gpdb/{}/cgroup.procs".format(groupid)
        stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], capture_output=True, check=True).stdout
        cgroups_pids = stdout.splitlines()

        return set(session_pids).issubset(set(cgroups_pids))

    for host in hosts:
        if not get_result(host):
            return False
    return True

$$ LANGUAGE plpython3u;
